package io.sundial.scheduler.impl;

import io.sundial.coordination.CallbackException;
import io.sundial.coordination.CallbackStatus;
import io.sundial.coordination.Coordinator;
import io.sundial.coordination.CoordinatorException;
import io.sundial.coordination.node.Node;
import io.sundial.coordination.tree.TreeEvent;
import io.sundial.coordination.tree.TreeWatcher;
import io.sundial.coordination.watching.EventWatching;
import io.sundial.core.Callback;
import io.sundial.engine.exception.ShuttingException;
import io.sundial.engine.exception.StartingException;
import io.sundial.job.JobResult;
import io.sundial.job.JobStatus;
import io.sundial.task.Scheduling;
import io.sundial.task.Triggering;

/**
 * 应变调度器
 *
 * @author Payne 646742615@qq.com
 * 2018/12/26 16:58
 */
public abstract class ReactingScheduler extends ElectingScheduler {
    private EventWatching replyTreeWatching;
    private Callback<Coordinator.DeleteResult, CallbackException> triggeringDeleteCallback = new TriggeringDeleteCallback();
    private Callback<Coordinator.DeleteResult, CallbackException> schedulingDeleteCallback = new SchedulingDeleteCallback();

    @Override
    protected void starting() throws StartingException {
        super.starting();

        try {
            replyTreeWatching = coordinator.watch(COMMANDS_ROOT, new ReplyTreeWatcher(), false);
        } catch (CoordinatorException e) {
            throw new StartingException(e);
        }
    }

    @Override
    protected void onPowerAcquired() throws Exception {
        super.onPowerAcquired();

        replyTreeWatching.start();
    }

    @Override
    protected void onPowerReleased() throws Exception {
        super.onPowerReleased();

        replyTreeWatching.pause();
    }

    @Override
    protected void shutting() throws ShuttingException {
        super.shutting();

        try {
            replyTreeWatching.close();
        } catch (CoordinatorException e) {
            throw new ShuttingException(e);
        }
    }

    private class ReplyTreeWatcher implements TreeWatcher {

        @Override
        public void onWatched(TreeEvent event) throws Exception {
            TreeEvent.Type type = event.getType();
            switch (type) {
                case NODE_CREATED:
                    Node node = event.getNode();
                    if (node == null) {
                        return;
                    }

                    String path = node.getPath();
                    String[] paths = path.split(SPRT);
                    if (paths.length != 8) {
                        return;
                    }

                    // /commands/{Executor}/{TaskGroup}/{TaskName}/{Time}/{ShardingIndex}/{JobStatus}
                    String executor = paths[2];
                    String taskGroup = paths[3];
                    String taskName = paths[4];
                    Long time = Long.valueOf(paths[5]);
                    Integer shardingIndex = Integer.valueOf(paths[6]);
                    JobStatus status = JobStatus.valueOf(paths[7]);

                    Triggering.Key triggeringKey = new Triggering.Key(taskName, taskGroup, time, shardingIndex);

                    switch (status) {
                        case SUBMITTED:
                            onJobSubmitted(triggeringKey);
                            break;
                        case REJECTED:
                            onJobRejected(triggeringKey);
                            break;
                        case CANCELED:
                            onJobCanceled(triggeringKey);
                            break;
                        case EXECUTING:
                            onJobExecuting(triggeringKey);
                            break;
                        case COMPLETED:
                            onJobCompleted(triggeringKey, node);
                            break;
                        case FAILED:
                            onJobFailed(triggeringKey, node);
                            break;
                    }

                    // 如果状态是最终状态，即代表该分片的调度状态不会再更新，所以删除该分片命令节点
                    if (status.immutable) {
                        coordinator.delete(
                                COMMANDS_ROOT + SPRT + executor + SPRT + taskGroup + SPRT + taskName + SPRT + time + SPRT + shardingIndex,
                                -1,
                                false,
                                true,
                                triggeringDeleteCallback,
                                triggeringKey
                        );
                    }
                    break;
                default:
                    break;
            }
        }

        private void onJobSubmitted(Triggering.Key key) {
            triggeringRepository.toSubmitted(key);
        }

        private void onJobRejected(Triggering.Key key) {
            triggeringRepository.toRejected(key);
        }

        private void onJobCanceled(Triggering.Key key) {
            triggeringRepository.toCanceled(key);
        }

        private void onJobExecuting(Triggering.Key key) {
            triggeringRepository.toExecuting(key);
        }

        private void onJobCompleted(Triggering.Key key, Node node) throws Exception {
            byte[] data = node.getData();
            JobResult jobResult = data != null ? unmarshal(data, JobResult.class) : null;
            triggeringRepository.toCompleted(
                    key,
                    jobResult != null ? jobResult.getCode() : -1,
                    jobResult != null ? jobResult.getMessage() : null,
                    data != null ? new String(data) : null
            );
        }

        private void onJobFailed(Triggering.Key key, Node node) throws Exception {
            byte[] data = node.getData();
            String result = data != null ? unmarshal(data, String.class) : null;
            triggeringRepository.toFailed(key, result);
        }
    }

    /**
     * 触发节点删除回调
     */
    private class TriggeringDeleteCallback implements Callback<Coordinator.DeleteResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.DeleteResult result, CallbackException exception) throws Exception {
            if (success) {
                logger.info("success deleting triggering node {}", result.getPath());
                // 尝试删除调度节点，如果存在子节点即证明调度还没最终完成，也无法删除。
                // /commands/{Executor}/{TaskGroup}/{TaskName}/{Time}/{ShardingIndex}
                String path = result.getPath();
                String[] paths = path.split(SPRT);
                String executor = paths[2];
                String taskGroup = paths[3];
                String taskName = paths[4];
                Long time = Long.valueOf(paths[5]);
                Scheduling.Key schedulingKey = new Scheduling.Key(taskName, taskGroup, time);
                coordinator.delete(
                        COMMANDS_ROOT + SPRT + executor + SPRT + taskGroup + SPRT + taskName + SPRT + time,
                        -1,
                        false,
                        false,
                        schedulingDeleteCallback,
                        schedulingKey
                );
            } else {
                Triggering.Key triggeringKey = (Triggering.Key) exception.getContext();
                logger.warn("error occurred while deleting triggering {} with code {}", triggeringKey, exception.getCode());
            }
        }
    }

    /**
     * 调度节点删除回调
     */
    private class SchedulingDeleteCallback implements Callback<Coordinator.DeleteResult, CallbackException> {

        @Override
        public void call(boolean success, Coordinator.DeleteResult result, CallbackException exception) {
            if (success) {
                logger.info("success deleting scheduling node {}", result.getPath());
                Scheduling.Key schedulingKey = (Scheduling.Key) result.getContext();
                schedulingRepository.toFinished(schedulingKey);
            } else if (exception.getCode() != CallbackStatus.CODE_NOT_EMPTY) {
                Scheduling.Key schedulingKey = (Scheduling.Key) exception.getContext();
                logger.warn("error occurred while deleting scheduling {} with code {}", schedulingKey, exception.getCode());
            }
        }
    }

}
